/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.flink.core.fs;

import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.util.function.SupplierWithException;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;

import java.io.Closeable;
import java.io.IOException;
import java.net.URI;
import java.util.HashSet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.Preconditions.checkState;

/**
 * A file system that limits the number of concurrently open input streams, output streams, and
 * total streams for a target file system.
 *
 * <p>This file system can wrap another existing file system in cases where the target file system
 * cannot handle certain connection spikes and connections would fail in that case. This happens,
 * for example, for very small HDFS clusters with few RPC handlers, when a large Flink job tries to
 * build up many connections during a checkpoint.
 *
 * <p>The filesystem may track the progress of streams and close streams that have been inactive for
 * too long, to avoid locked streams of taking up the complete pool. Rather than having a dedicated
 * reaper thread, the calls that try to open a new stream periodically check the currently open
 * streams once the limit of open streams is reached.
 */
@Internal
public class LimitedConnectionsFileSystem extends FileSystem {

    private static final Logger LOG = LoggerFactory.getLogger(LimitedConnectionsFileSystem.class);

    /** The original file system to which connections are limited. */
    private final FileSystem originalFs;

    /** The lock that synchronizes connection bookkeeping. */
    private final ReentrantLock lock;

    /** Condition for threads that are blocking on the availability of new connections. */
    private final Condition available;

    /** The maximum number of concurrently open output streams. */
    private final int maxNumOpenOutputStreams;

    /** The maximum number of concurrently open input streams. */
    private final int maxNumOpenInputStreams;

    /** The maximum number of concurrently open streams (input + output). */
    private final int maxNumOpenStreamsTotal;

    /** The nanoseconds that a opening a stream may wait for availability. */
    private final long streamOpenTimeoutNanos;

    /**
     * The nanoseconds that a stream may spend not writing any bytes before it is closed as
     * inactive.
     */
    private final long streamInactivityTimeoutNanos;

    /** The set of currently open output streams. */
    @GuardedBy("lock")
    private final HashSet<OutStream> openOutputStreams;

    /** The set of currently open input streams. */
    @GuardedBy("lock")
    private final HashSet<InStream> openInputStreams;

    /** The number of output streams reserved to be opened. */
    @GuardedBy("lock")
    private int numReservedOutputStreams;

    /** The number of input streams reserved to be opened. */
    @GuardedBy("lock")
    private int numReservedInputStreams;

    // ------------------------------------------------------------------------

    /**
     * Creates a new output connection limiting file system.
     *
     * <p>If streams are inactive (meaning not writing bytes) for longer than the given timeout,
     * then they are terminated as "inactive", to prevent that the limited number of connections
     * gets stuck on only blocked threads.
     *
     * @param originalFs The original file system to which connections are limited.
     * @param maxNumOpenStreamsTotal The maximum number of concurrent open streams (0 means no
     *     limit).
     */
    public LimitedConnectionsFileSystem(FileSystem originalFs, int maxNumOpenStreamsTotal) {
        this(originalFs, maxNumOpenStreamsTotal, 0, 0);
    }

    /**
     * Creates a new output connection limiting file system.
     *
     * <p>If streams are inactive (meaning not writing bytes) for longer than the given timeout,
     * then they are terminated as "inactive", to prevent that the limited number of connections
     * gets stuck on only blocked threads.
     *
     * @param originalFs The original file system to which connections are limited.
     * @param maxNumOpenStreamsTotal The maximum number of concurrent open streams (0 means no
     *     limit).
     * @param streamOpenTimeout The maximum number of milliseconds that the file system will wait
     *     when no more connections are currently permitted.
     * @param streamInactivityTimeout The milliseconds that a stream may spend not writing any bytes
     *     before it is closed as inactive.
     */
    public LimitedConnectionsFileSystem(
            FileSystem originalFs,
            int maxNumOpenStreamsTotal,
            long streamOpenTimeout,
            long streamInactivityTimeout) {
        this(originalFs, maxNumOpenStreamsTotal, 0, 0, streamOpenTimeout, streamInactivityTimeout);
    }

    /**
     * Creates a new output connection limiting file system, limiting input and output streams with
     * potentially different quotas.
     *
     * <p>If streams are inactive (meaning not writing bytes) for longer than the given timeout,
     * then they are terminated as "inactive", to prevent that the limited number of connections
     * gets stuck on only blocked threads.
     *
     * @param originalFs The original file system to which connections are limited.
     * @param maxNumOpenStreamsTotal The maximum number of concurrent open streams (0 means no
     *     limit).
     * @param maxNumOpenOutputStreams The maximum number of concurrent open output streams (0 means
     *     no limit).
     * @param maxNumOpenInputStreams The maximum number of concurrent open input streams (0 means no
     *     limit).
     * @param streamOpenTimeout The maximum number of milliseconds that the file system will wait
     *     when no more connections are currently permitted.
     * @param streamInactivityTimeout The milliseconds that a stream may spend not writing any bytes
     *     before it is closed as inactive.
     */
    public LimitedConnectionsFileSystem(
            FileSystem originalFs,
            int maxNumOpenStreamsTotal,
            int maxNumOpenOutputStreams,
            int maxNumOpenInputStreams,
            long streamOpenTimeout,
            long streamInactivityTimeout) {

        checkArgument(maxNumOpenStreamsTotal >= 0, "maxNumOpenStreamsTotal must be >= 0");
        checkArgument(maxNumOpenOutputStreams >= 0, "maxNumOpenOutputStreams must be >= 0");
        checkArgument(maxNumOpenInputStreams >= 0, "maxNumOpenInputStreams must be >= 0");
        checkArgument(
                streamOpenTimeout >= 0,
                "stream opening timeout must be >= 0 (0 means infinite timeout)");
        checkArgument(
                streamInactivityTimeout >= 0,
                "stream inactivity timeout must be >= 0 (0 means infinite timeout)");

        this.originalFs = checkNotNull(originalFs, "originalFs");
        this.lock = new ReentrantLock(true);
        this.available = lock.newCondition();
        this.openOutputStreams = new HashSet<>();
        this.openInputStreams = new HashSet<>();
        this.maxNumOpenStreamsTotal = maxNumOpenStreamsTotal;
        this.maxNumOpenOutputStreams = maxNumOpenOutputStreams;
        this.maxNumOpenInputStreams = maxNumOpenInputStreams;

        // assign nanos overflow aware
        final long openTimeoutNanos = streamOpenTimeout * 1_000_000;
        final long inactivityTimeoutNanos = streamInactivityTimeout * 1_000_000;

        this.streamOpenTimeoutNanos =
                openTimeoutNanos >= streamOpenTimeout ? openTimeoutNanos : Long.MAX_VALUE;

        this.streamInactivityTimeoutNanos =
                inactivityTimeoutNanos >= streamInactivityTimeout
                        ? inactivityTimeoutNanos
                        : Long.MAX_VALUE;
    }

    // ------------------------------------------------------------------------

    /** Gets the maximum number of concurrently open output streams. */
    public int getMaxNumOpenOutputStreams() {
        return maxNumOpenOutputStreams;
    }

    /** Gets the maximum number of concurrently open input streams. */
    public int getMaxNumOpenInputStreams() {
        return maxNumOpenInputStreams;
    }

    /** Gets the maximum number of concurrently open streams (input + output). */
    public int getMaxNumOpenStreamsTotal() {
        return maxNumOpenStreamsTotal;
    }

    /**
     * Gets the number of milliseconds that a opening a stream may wait for availability in the
     * connection pool.
     */
    public long getStreamOpenTimeout() {
        return streamOpenTimeoutNanos / 1_000_000;
    }

    /**
     * Gets the milliseconds that a stream may spend not writing any bytes before it is closed as
     * inactive.
     */
    public long getStreamInactivityTimeout() {
        return streamInactivityTimeoutNanos / 1_000_000;
    }

    /** Gets the total number of open streams (input plus output). */
    public int getTotalNumberOfOpenStreams() {
        lock.lock();
        try {
            return numReservedOutputStreams + numReservedInputStreams;
        } finally {
            lock.unlock();
        }
    }

    /** Gets the number of currently open output streams. */
    public int getNumberOfOpenOutputStreams() {
        lock.lock();
        try {
            return numReservedOutputStreams;
        } finally {
            lock.unlock();
        }
    }

    /** Gets the number of currently open input streams. */
    public int getNumberOfOpenInputStreams() {
        return numReservedInputStreams;
    }

    // ------------------------------------------------------------------------
    //  input & output stream opening methods
    // ------------------------------------------------------------------------

    @Override
    public FSDataOutputStream create(Path f, WriteMode overwriteMode) throws IOException {
        return createOutputStream(() -> originalFs.create(f, overwriteMode));
    }

    @Override
    @Deprecated
    @SuppressWarnings("deprecation")
    public FSDataOutputStream create(
            Path f, boolean overwrite, int bufferSize, short replication, long blockSize)
            throws IOException {

        return createOutputStream(
                () -> originalFs.create(f, overwrite, bufferSize, replication, blockSize));
    }

    @Override
    public FSDataInputStream open(Path f, int bufferSize) throws IOException {
        return createInputStream(() -> originalFs.open(f, bufferSize));
    }

    @Override
    public FSDataInputStream open(Path f) throws IOException {
        return createInputStream(() -> originalFs.open(f));
    }

    private FSDataOutputStream createOutputStream(
            final SupplierWithException<FSDataOutputStream, IOException> streamOpener)
            throws IOException {

        final SupplierWithException<OutStream, IOException> wrappedStreamOpener =
                () -> new OutStream(streamOpener.get(), this);

        return createStream(wrappedStreamOpener, openOutputStreams, true);
    }

    private FSDataInputStream createInputStream(
            final SupplierWithException<FSDataInputStream, IOException> streamOpener)
            throws IOException {

        final SupplierWithException<InStream, IOException> wrappedStreamOpener =
                () -> new InStream(streamOpener.get(), this);

        return createStream(wrappedStreamOpener, openInputStreams, false);
    }

    // ------------------------------------------------------------------------
    //  other delegating file system methods
    // ------------------------------------------------------------------------

    @Override
    public FileSystemKind getKind() {
        return originalFs.getKind();
    }

    @Override
    public boolean isDistributedFS() {
        return originalFs.isDistributedFS();
    }

    @Override
    public Path getWorkingDirectory() {
        return originalFs.getWorkingDirectory();
    }

    @Override
    public Path getHomeDirectory() {
        return originalFs.getHomeDirectory();
    }

    @Override
    public URI getUri() {
        return originalFs.getUri();
    }

    @Override
    public FileStatus getFileStatus(Path f) throws IOException {
        return originalFs.getFileStatus(f);
    }

    @Override
    public BlockLocation[] getFileBlockLocations(FileStatus file, long start, long len)
            throws IOException {
        return originalFs.getFileBlockLocations(file, start, len);
    }

    @Override
    public FileStatus[] listStatus(Path f) throws IOException {
        return originalFs.listStatus(f);
    }

    @Override
    public boolean delete(Path f, boolean recursive) throws IOException {
        return originalFs.delete(f, recursive);
    }

    @Override
    public boolean mkdirs(Path f) throws IOException {
        return originalFs.mkdirs(f);
    }

    @Override
    public boolean rename(Path src, Path dst) throws IOException {
        return originalFs.rename(src, dst);
    }

    @Override
    public boolean exists(Path f) throws IOException {
        return originalFs.exists(f);
    }

    @Override
    @Deprecated
    @SuppressWarnings("deprecation")
    public long getDefaultBlockSize() {
        return originalFs.getDefaultBlockSize();
    }

    // ------------------------------------------------------------------------

    private <T extends StreamWithTimeout> T createStream(
            final SupplierWithException<T, IOException> streamOpener,
            final HashSet<T> openStreams,
            final boolean output)
            throws IOException {

        final int outputLimit =
                output && maxNumOpenOutputStreams > 0 ? maxNumOpenOutputStreams : Integer.MAX_VALUE;
        final int inputLimit =
                !output && maxNumOpenInputStreams > 0 ? maxNumOpenInputStreams : Integer.MAX_VALUE;
        final int totalLimit =
                maxNumOpenStreamsTotal > 0 ? maxNumOpenStreamsTotal : Integer.MAX_VALUE;
        final int outputCredit = output ? 1 : 0;
        final int inputCredit = output ? 0 : 1;

        // because waiting for availability may take long, we need to be interruptible here
        // and handle interrupted exceptions as I/O errors
        // even though the code is written to make sure the lock is held for a short time only,
        // making the lock acquisition interruptible helps to guard against the cases where
        // a supposedly fast operation (like 'getPos()' on a stream) actually takes long.
        try {
            lock.lockInterruptibly();
            try {
                // some integrity checks
                assert openOutputStreams.size() <= numReservedOutputStreams;
                assert openInputStreams.size() <= numReservedInputStreams;

                // wait until there are few enough streams so we can open another
                waitForAvailability(totalLimit, outputLimit, inputLimit);

                // We do not open the stream here in the locked scope because opening a stream
                // could take a while. Holding the lock during that operation would block all
                // concurrent
                // attempts to try and open a stream, effectively serializing all calls to open the
                // streams.
                numReservedOutputStreams += outputCredit;
                numReservedInputStreams += inputCredit;
            } finally {
                lock.unlock();
            }
        } catch (InterruptedException e) {
            // restore interruption flag
            Thread.currentThread().interrupt();
            throw new IOException("interrupted before opening stream");
        }

        // open the stream outside the lock.
        boolean success = false;
        try {
            final T out = streamOpener.get();

            // add the stream to the set, need to re-acquire the lock
            lock.lock();
            try {
                openStreams.add(out);
            } finally {
                lock.unlock();
            }

            // good, can now return cleanly
            success = true;
            return out;
        } finally {
            if (!success) {
                // remove the reserved credit
                // we must open this non-interruptibly, because this must succeed!
                lock.lock();
                try {
                    numReservedOutputStreams -= outputCredit;
                    numReservedInputStreams -= inputCredit;
                    available.signalAll();
                } finally {
                    lock.unlock();
                }
            }
        }
    }

    @GuardedBy("lock")
    private void waitForAvailability(int totalLimit, int outputLimit, int inputLimit)
            throws InterruptedException, IOException {

        checkState(lock.isHeldByCurrentThread());

        // compute the deadline of this operations
        final long deadline;
        if (streamOpenTimeoutNanos == 0) {
            deadline = Long.MAX_VALUE;
        } else {
            long deadlineNanos = System.nanoTime() + streamOpenTimeoutNanos;
            // check for overflow
            deadline = deadlineNanos > 0 ? deadlineNanos : Long.MAX_VALUE;
        }

        // wait for available connections
        long timeLeft;

        if (streamInactivityTimeoutNanos == 0) {
            // simple case: just wait
            while ((timeLeft = (deadline - System.nanoTime())) > 0
                    && !hasAvailability(totalLimit, outputLimit, inputLimit)) {

                available.await(timeLeft, TimeUnit.NANOSECONDS);
            }
        } else {
            // complex case: chase down inactive streams
            final long checkIntervalNanos = (streamInactivityTimeoutNanos >>> 1) + 1;

            long now;
            while ((timeLeft = (deadline - (now = System.nanoTime()))) > 0
                    && // while still within timeout
                    !hasAvailability(totalLimit, outputLimit, inputLimit)) {

                // check all streams whether there in one that has been inactive for too long
                if (!(closeInactiveStream(openOutputStreams, now)
                        || closeInactiveStream(openInputStreams, now))) {
                    // only wait if we did not manage to close any stream.
                    // otherwise eagerly check again if we have availability now (we should have!)
                    long timeToWait = Math.min(checkIntervalNanos, timeLeft);
                    available.await(timeToWait, TimeUnit.NANOSECONDS);
                }
            }
        }

        // check for timeout
        // we check availability again to catch cases where the timeout expired while waiting
        // to re-acquire the lock
        if (timeLeft <= 0 && !hasAvailability(totalLimit, outputLimit, inputLimit)) {
            throw new IOException(
                    String.format(
                            "Timeout while waiting for an available stream/connection. "
                                    + "limits: total=%d, input=%d, output=%d ; Open: input=%d, output=%d ; timeout: %d ms",
                            maxNumOpenStreamsTotal,
                            maxNumOpenInputStreams,
                            maxNumOpenOutputStreams,
                            numReservedInputStreams,
                            numReservedOutputStreams,
                            getStreamOpenTimeout()));
        }
    }

    @GuardedBy("lock")
    private boolean hasAvailability(int totalLimit, int outputLimit, int inputLimit) {
        return numReservedOutputStreams < outputLimit
                && numReservedInputStreams < inputLimit
                && numReservedOutputStreams + numReservedInputStreams < totalLimit;
    }

    @GuardedBy("lock")
    private boolean closeInactiveStream(
            HashSet<? extends StreamWithTimeout> streams, long nowNanos) {
        for (StreamWithTimeout stream : streams) {
            try {
                final StreamProgressTracker tracker = stream.getProgressTracker();

                // If the stream is closed already, it will be removed anyways, so we
                // do not classify it as inactive. We also skip the check if another check happened
                // too recently.
                if (stream.isClosed()
                        || nowNanos
                                < tracker.getLastCheckTimestampNanos()
                                        + streamInactivityTimeoutNanos) {
                    // interval since last check not yet over
                    return false;
                } else if (!tracker.checkNewBytesAndMark(nowNanos)) {
                    stream.closeDueToTimeout();
                    return true;
                }
            } catch (StreamTimeoutException ignored) {
                // may happen due to races
            } catch (IOException e) {
                // only log on debug level here, to avoid log spamming
                LOG.debug("Could not check for stream progress to determine inactivity", e);
            }
        }

        return false;
    }

    // ------------------------------------------------------------------------

    /**
     * Atomically removes the given output stream from the set of currently open output streams, and
     * signals that new stream can now be opened.
     */
    void unregisterOutputStream(OutStream stream) {
        lock.lock();
        try {
            // only decrement if we actually remove the stream
            if (openOutputStreams.remove(stream)) {
                numReservedOutputStreams--;
                available.signalAll();
            }
        } finally {
            lock.unlock();
        }
    }

    /**
     * Atomically removes the given input stream from the set of currently open input streams, and
     * signals that new stream can now be opened.
     */
    void unregisterInputStream(InStream stream) {
        lock.lock();
        try {
            // only decrement if we actually remove the stream
            if (openInputStreams.remove(stream)) {
                numReservedInputStreams--;
                available.signalAll();
            }
        } finally {
            lock.unlock();
        }
    }

    // ------------------------------------------------------------------------

    /** A special IOException, indicating a timeout in the data output stream. */
    public static final class StreamTimeoutException extends IOException {

        private static final long serialVersionUID = -8790922066795901928L;

        public StreamTimeoutException() {
            super(
                    "Stream closed due to inactivity timeout. "
                            + "This is done to prevent inactive streams from blocking the full "
                            + "pool of limited connections");
        }

        public StreamTimeoutException(StreamTimeoutException other) {
            super(other.getMessage(), other);
        }
    }

    // ------------------------------------------------------------------------

    /** Interface for streams that can be checked for inactivity. */
    private interface StreamWithTimeout extends Closeable {

        /** Gets the progress tracker for this stream. */
        StreamProgressTracker getProgressTracker();

        /** Gets the current position in the stream, as in number of bytes read or written. */
        long getPos() throws IOException;

        /**
         * Closes the stream asynchronously with a special exception that indicates closing due to
         * lack of progress.
         */
        void closeDueToTimeout() throws IOException;

        /** Checks whether the stream was closed already. */
        boolean isClosed();
    }

    // ------------------------------------------------------------------------

    /**
     * A tracker for stream progress. This records the number of bytes read / written together with
     * a timestamp when the last check happened.
     */
    private static final class StreamProgressTracker {

        /** The tracked stream. */
        private final StreamWithTimeout stream;

        /**
         * The number of bytes written the last time that the {@link #checkNewBytesAndMark(long)}
         * method was called. It is important to initialize this with {@code -1} so that the first
         * check (0 bytes) always appears to have made progress.
         */
        private volatile long lastCheckBytes = -1;

        /** The timestamp when the last inactivity evaluation was made. */
        private volatile long lastCheckTimestampNanos;

        StreamProgressTracker(StreamWithTimeout stream) {
            this.stream = stream;
        }

        /** Gets the timestamp when the last inactivity evaluation was made. */
        public long getLastCheckTimestampNanos() {
            return lastCheckTimestampNanos;
        }

        /**
         * Checks whether there were new bytes since the last time this method was invoked. This
         * also sets the given timestamp, to be read via {@link #getLastCheckTimestampNanos()}.
         *
         * @return True, if there were new bytes, false if not.
         */
        public boolean checkNewBytesAndMark(long timestamp) throws IOException {
            // remember the time when checked
            lastCheckTimestampNanos = timestamp;

            final long bytesNow = stream.getPos();
            if (bytesNow > lastCheckBytes) {
                lastCheckBytes = bytesNow;
                return true;
            } else {
                return false;
            }
        }
    }

    // ------------------------------------------------------------------------

    /**
     * A data output stream that wraps a given data output stream and un-registers from a given
     * connection-limiting file system (via {@link
     * LimitedConnectionsFileSystem#unregisterOutputStream(OutStream)} upon closing.
     */
    private static final class OutStream extends FSDataOutputStream implements StreamWithTimeout {

        /** The original data output stream to write to. */
        private final FSDataOutputStream originalStream;

        /** The connection-limiting file system to un-register from. */
        private final LimitedConnectionsFileSystem fs;

        /** The progress tracker for this stream. */
        private final StreamProgressTracker progressTracker;

        /** An exception with which the stream has been externally closed. */
        private volatile StreamTimeoutException timeoutException;

        /** Flag tracking whether the stream was already closed, for proper inactivity tracking. */
        private final AtomicBoolean closed = new AtomicBoolean();

        OutStream(FSDataOutputStream originalStream, LimitedConnectionsFileSystem fs) {
            this.originalStream = checkNotNull(originalStream);
            this.fs = checkNotNull(fs);
            this.progressTracker = new StreamProgressTracker(this);
        }

        // --- FSDataOutputStream API implementation

        @Override
        public void write(int b) throws IOException {
            try {
                originalStream.write(b);
            } catch (IOException e) {
                handleIOException(e);
            }
        }

        @Override
        public void write(byte[] b, int off, int len) throws IOException {
            try {
                originalStream.write(b, off, len);
            } catch (IOException e) {
                handleIOException(e);
            }
        }

        @Override
        public long getPos() throws IOException {
            try {
                return originalStream.getPos();
            } catch (IOException e) {
                handleIOException(e);
                return -1; // silence the compiler
            }
        }

        @Override
        public void flush() throws IOException {
            try {
                originalStream.flush();
            } catch (IOException e) {
                handleIOException(e);
            }
        }

        @Override
        public void sync() throws IOException {
            try {
                originalStream.sync();
            } catch (IOException e) {
                handleIOException(e);
            }
        }

        @Override
        public void close() throws IOException {
            if (closed.compareAndSet(false, true)) {
                try {
                    originalStream.close();
                } catch (IOException e) {
                    handleIOException(e);
                } finally {
                    fs.unregisterOutputStream(this);
                }
            }
        }

        @Override
        public void closeDueToTimeout() throws IOException {
            this.timeoutException = new StreamTimeoutException();
            close();
        }

        @Override
        public boolean isClosed() {
            return closed.get();
        }

        @Override
        public StreamProgressTracker getProgressTracker() {
            return progressTracker;
        }

        private void handleIOException(IOException exception) throws IOException {
            if (timeoutException == null) {
                throw exception;
            } else {
                // throw a new exception to capture this call's stack trace
                // the new exception is forwarded as a suppressed exception
                StreamTimeoutException te = new StreamTimeoutException(timeoutException);
                te.addSuppressed(exception);
                throw te;
            }
        }
    }

    /**
     * A data input stream that wraps a given data input stream and un-registers from a given
     * connection-limiting file system (via {@link
     * LimitedConnectionsFileSystem#unregisterInputStream(InStream)} upon closing.
     */
    private static final class InStream extends FSDataInputStream implements StreamWithTimeout {

        /** The original data input stream to read from. */
        private final FSDataInputStream originalStream;

        /** The connection-limiting file system to un-register from. */
        private final LimitedConnectionsFileSystem fs;

        /** An exception with which the stream has been externally closed. */
        private volatile StreamTimeoutException timeoutException;

        /** The progress tracker for this stream. */
        private final StreamProgressTracker progressTracker;

        /** Flag tracking whether the stream was already closed, for proper inactivity tracking. */
        private final AtomicBoolean closed = new AtomicBoolean();

        InStream(FSDataInputStream originalStream, LimitedConnectionsFileSystem fs) {
            this.originalStream = checkNotNull(originalStream);
            this.fs = checkNotNull(fs);
            this.progressTracker = new StreamProgressTracker(this);
        }

        // --- FSDataOutputStream API implementation

        @Override
        public int read() throws IOException {
            try {
                return originalStream.read();
            } catch (IOException e) {
                handleIOException(e);
                return 0; // silence the compiler
            }
        }

        @Override
        public int read(byte[] b) throws IOException {
            try {
                return originalStream.read(b);
            } catch (IOException e) {
                handleIOException(e);
                return 0; // silence the compiler
            }
        }

        @Override
        public int read(byte[] b, int off, int len) throws IOException {
            try {
                return originalStream.read(b, off, len);
            } catch (IOException e) {
                handleIOException(e);
                return 0; // silence the compiler
            }
        }

        @Override
        public long skip(long n) throws IOException {
            try {
                return originalStream.skip(n);
            } catch (IOException e) {
                handleIOException(e);
                return 0L; // silence the compiler
            }
        }

        @Override
        public int available() throws IOException {
            try {
                return originalStream.available();
            } catch (IOException e) {
                handleIOException(e);
                return 0; // silence the compiler
            }
        }

        @Override
        public void mark(int readlimit) {
            originalStream.mark(readlimit);
        }

        @Override
        public void reset() throws IOException {
            try {
                originalStream.reset();
            } catch (IOException e) {
                handleIOException(e);
            }
        }

        @Override
        public boolean markSupported() {
            return originalStream.markSupported();
        }

        @Override
        public void seek(long desired) throws IOException {
            try {
                originalStream.seek(desired);
            } catch (IOException e) {
                handleIOException(e);
            }
        }

        @Override
        public long getPos() throws IOException {
            try {
                return originalStream.getPos();
            } catch (IOException e) {
                handleIOException(e);
                return 0; // silence the compiler
            }
        }

        @Override
        public void close() throws IOException {
            if (closed.compareAndSet(false, true)) {
                try {
                    originalStream.close();
                } catch (IOException e) {
                    handleIOException(e);
                } finally {
                    fs.unregisterInputStream(this);
                }
            }
        }

        @Override
        public void closeDueToTimeout() throws IOException {
            this.timeoutException = new StreamTimeoutException();
            close();
        }

        @Override
        public boolean isClosed() {
            return closed.get();
        }

        @Override
        public StreamProgressTracker getProgressTracker() {
            return progressTracker;
        }

        private void handleIOException(IOException exception) throws IOException {
            if (timeoutException == null) {
                throw exception;
            } else {
                // throw a new exception to capture this call's stack trace
                // the new exception is forwarded as a suppressed exception
                StreamTimeoutException te = new StreamTimeoutException(timeoutException);
                te.addSuppressed(exception);
                throw te;
            }
        }
    }

    // ------------------------------------------------------------------------

    /** A simple configuration data object capturing the settings for limited connections. */
    public static class ConnectionLimitingSettings {

        /** The limit for the total number of connections, or 0, if no limit. */
        public final int limitTotal;

        /** The limit for the number of input stream connections, or 0, if no limit. */
        public final int limitInput;

        /** The limit for the number of output stream connections, or 0, if no limit. */
        public final int limitOutput;

        /** The stream opening timeout for a stream, in milliseconds. */
        public final long streamOpenTimeout;

        /** The inactivity timeout for a stream, in milliseconds. */
        public final long streamInactivityTimeout;

        /**
         * Creates a new ConnectionLimitingSettings with the given parameters.
         *
         * @param limitTotal The limit for the total number of connections, or 0, if no limit.
         * @param limitInput The limit for the number of input stream connections, or 0, if no
         *     limit.
         * @param limitOutput The limit for the number of output stream connections, or 0, if no
         *     limit.
         * @param streamOpenTimeout The maximum number of milliseconds that the file system will
         *     wait when no more connections are currently permitted.
         * @param streamInactivityTimeout The milliseconds that a stream may spend not writing any
         *     bytes before it is closed as inactive.
         */
        public ConnectionLimitingSettings(
                int limitTotal,
                int limitInput,
                int limitOutput,
                long streamOpenTimeout,
                long streamInactivityTimeout) {
            checkArgument(limitTotal >= 0);
            checkArgument(limitInput >= 0);
            checkArgument(limitOutput >= 0);
            checkArgument(streamOpenTimeout >= 0);
            checkArgument(streamInactivityTimeout >= 0);

            this.limitTotal = limitTotal;
            this.limitInput = limitInput;
            this.limitOutput = limitOutput;
            this.streamOpenTimeout = streamOpenTimeout;
            this.streamInactivityTimeout = streamInactivityTimeout;
        }

        // --------------------------------------------------------------------

        /**
         * Parses and returns the settings for connection limiting, for the file system with the
         * given file system scheme.
         *
         * @param config The configuration to check.
         * @param fsScheme The file system scheme.
         * @return The parsed configuration, or null, if no connection limiting is configured.
         */
        @Nullable
        public static ConnectionLimitingSettings fromConfig(Configuration config, String fsScheme) {
            checkNotNull(fsScheme, "fsScheme");
            checkNotNull(config, "config");

            final ConfigOption<Integer> totalLimitOption =
                    CoreOptions.fileSystemConnectionLimit(fsScheme);
            final ConfigOption<Integer> limitInOption =
                    CoreOptions.fileSystemConnectionLimitIn(fsScheme);
            final ConfigOption<Integer> limitOutOption =
                    CoreOptions.fileSystemConnectionLimitOut(fsScheme);

            final int totalLimit = config.getInteger(totalLimitOption);
            final int limitIn = config.getInteger(limitInOption);
            final int limitOut = config.getInteger(limitOutOption);

            checkLimit(totalLimit, totalLimitOption);
            checkLimit(limitIn, limitInOption);
            checkLimit(limitOut, limitOutOption);

            // create the settings only, if at least one limit is configured
            if (totalLimit <= 0 && limitIn <= 0 && limitOut <= 0) {
                // no limit configured
                return null;
            } else {
                final ConfigOption<Long> openTimeoutOption =
                        CoreOptions.fileSystemConnectionLimitTimeout(fsScheme);
                final ConfigOption<Long> inactivityTimeoutOption =
                        CoreOptions.fileSystemConnectionLimitStreamInactivityTimeout(fsScheme);

                final long openTimeout = config.getLong(openTimeoutOption);
                final long inactivityTimeout = config.getLong(inactivityTimeoutOption);

                checkTimeout(openTimeout, openTimeoutOption);
                checkTimeout(inactivityTimeout, inactivityTimeoutOption);

                return new ConnectionLimitingSettings(
                        totalLimit == -1 ? 0 : totalLimit,
                        limitIn == -1 ? 0 : limitIn,
                        limitOut == -1 ? 0 : limitOut,
                        openTimeout,
                        inactivityTimeout);
            }
        }

        private static void checkLimit(int value, ConfigOption<Integer> option) {
            if (value < -1) {
                throw new IllegalConfigurationException(
                        "Invalid value for '" + option.key() + "': " + value);
            }
        }

        private static void checkTimeout(long timeout, ConfigOption<Long> option) {
            if (timeout < 0) {
                throw new IllegalConfigurationException(
                        "Invalid value for '" + option.key() + "': " + timeout);
            }
        }
    }
}
